home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_socketserver.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2005-10-18  |  7KB  |  246 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. from test import test_support
  5. from test.test_support import verbose, verify, TESTFN, TestSkipped
  6. test_support.requires('network')
  7. from SocketServer import *
  8. import socket
  9. import select
  10. import time
  11. import threading
  12. import os
  13. NREQ = 3
  14. DELAY = 0.5
  15.  
  16. class MyMixinHandler:
  17.     
  18.     def handle(self):
  19.         time.sleep(DELAY)
  20.         line = self.rfile.readline()
  21.         time.sleep(DELAY)
  22.         self.wfile.write(line)
  23.  
  24.  
  25.  
  26. class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
  27.     pass
  28.  
  29.  
  30. class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
  31.     pass
  32.  
  33.  
  34. class MyMixinServer:
  35.     
  36.     def serve_a_few(self):
  37.         for i in range(NREQ):
  38.             self.handle_request()
  39.         
  40.  
  41.     
  42.     def handle_error(self, request, client_address):
  43.         self.close_request(request)
  44.         self.server_close()
  45.         raise 
  46.  
  47.  
  48. teststring = 'hello world\n'
  49.  
  50. def receive(sock, n, timeout = 20):
  51.     (r, w, x) = select.select([
  52.         sock], [], [], timeout)
  53.     if sock in r:
  54.         return sock.recv(n)
  55.     else:
  56.         raise RuntimeError, 'timed out on %r' % (sock,)
  57.  
  58.  
  59. def testdgram(proto, addr):
  60.     s = socket.socket(proto, socket.SOCK_DGRAM)
  61.     s.sendto(teststring, addr)
  62.     buf = data = receive(s, 100)
  63.     while data and '\n' not in buf:
  64.         data = receive(s, 100)
  65.         buf += data
  66.     verify(buf == teststring)
  67.     s.close()
  68.  
  69.  
  70. def teststream(proto, addr):
  71.     s = socket.socket(proto, socket.SOCK_STREAM)
  72.     s.connect(addr)
  73.     s.sendall(teststring)
  74.     buf = data = receive(s, 100)
  75.     while data and '\n' not in buf:
  76.         data = receive(s, 100)
  77.         buf += data
  78.     verify(buf == teststring)
  79.     s.close()
  80.  
  81.  
  82. class ServerThread(threading.Thread):
  83.     
  84.     def __init__(self, addr, svrcls, hdlrcls):
  85.         threading.Thread.__init__(self)
  86.         self._ServerThread__addr = addr
  87.         self._ServerThread__svrcls = svrcls
  88.         self._ServerThread__hdlrcls = hdlrcls
  89.  
  90.     
  91.     def run(self):
  92.         
  93.         class svrcls(MyMixinServer, self._ServerThread__svrcls):
  94.             pass
  95.  
  96.         if verbose:
  97.             print 'thread: creating server'
  98.         
  99.         svr = svrcls(self._ServerThread__addr, self._ServerThread__hdlrcls)
  100.         if verbose:
  101.             print 'thread: serving three times'
  102.         
  103.         svr.serve_a_few()
  104.         if verbose:
  105.             print 'thread: done'
  106.         
  107.  
  108.  
  109. seed = 0
  110.  
  111. def pickport():
  112.     global seed
  113.     seed += 1
  114.     return 10000 + (os.getpid() % 1000) * 10 + seed
  115.  
  116. host = 'localhost'
  117. testfiles = []
  118.  
  119. def pickaddr(proto):
  120.     if proto == socket.AF_INET:
  121.         return (host, pickport())
  122.     else:
  123.         fn = TESTFN + str(pickport())
  124.         if os.name == 'os2':
  125.             if fn[1] == ':':
  126.                 fn = fn[2:]
  127.             
  128.             if fn[0] in (os.sep, os.altsep):
  129.                 fn = fn[1:]
  130.             
  131.             fn = os.path.join('\\socket', fn)
  132.             if os.sep == '/':
  133.                 fn = fn.replace(os.sep, os.altsep)
  134.             else:
  135.                 fn = fn.replace(os.altsep, os.sep)
  136.         
  137.         testfiles.append(fn)
  138.         return fn
  139.  
  140.  
  141. def cleanup():
  142.     for fn in testfiles:
  143.         
  144.         try:
  145.             os.remove(fn)
  146.         continue
  147.         except os.error:
  148.             continue
  149.         
  150.  
  151.     
  152.     testfiles[:] = []
  153.  
  154.  
  155. def testloop(proto, servers, hdlrcls, testfunc):
  156.     for svrcls in servers:
  157.         addr = pickaddr(proto)
  158.         if verbose:
  159.             print 'ADDR =', addr
  160.             print 'CLASS =', svrcls
  161.         
  162.         t = ServerThread(addr, svrcls, hdlrcls)
  163.         if verbose:
  164.             print 'server created'
  165.         
  166.         t.start()
  167.         if verbose:
  168.             print 'server running'
  169.         
  170.         for i in range(NREQ):
  171.             time.sleep(DELAY)
  172.             if verbose:
  173.                 print 'test client', i
  174.             
  175.             testfunc(proto, addr)
  176.         
  177.         if verbose:
  178.             print 'waiting for server'
  179.         
  180.         t.join()
  181.         if verbose:
  182.             print 'done'
  183.             continue
  184.     
  185.  
  186. tcpservers = [
  187.     TCPServer,
  188.     ThreadingTCPServer]
  189. if hasattr(os, 'fork') and os.name not in ('os2',):
  190.     tcpservers.append(ForkingTCPServer)
  191.  
  192. udpservers = [
  193.     UDPServer,
  194.     ThreadingUDPServer]
  195. if hasattr(os, 'fork') and os.name not in ('os2',):
  196.     udpservers.append(ForkingUDPServer)
  197.  
  198. if not hasattr(socket, 'AF_UNIX'):
  199.     streamservers = []
  200.     dgramservers = []
  201. else:
  202.     
  203.     class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer):
  204.         pass
  205.  
  206.     streamservers = [
  207.         UnixStreamServer,
  208.         ThreadingUnixStreamServer]
  209.     if hasattr(os, 'fork') and os.name not in ('os2',):
  210.         streamservers.append(ForkingUnixStreamServer)
  211.     
  212.     
  213.     class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer):
  214.         pass
  215.  
  216.     dgramservers = [
  217.         UnixDatagramServer,
  218.         ThreadingUnixDatagramServer]
  219.     if hasattr(os, 'fork') and os.name not in ('os2',):
  220.         dgramservers.append(ForkingUnixDatagramServer)
  221.     
  222.  
  223. def testall():
  224.     testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
  225.     testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
  226.     if hasattr(socket, 'AF_UNIX'):
  227.         testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
  228.     
  229.  
  230.  
  231. def test_main():
  232.     import imp as imp
  233.     if imp.lock_held():
  234.         raise TestSkipped("can't run when import lock is held")
  235.     
  236.     
  237.     try:
  238.         testall()
  239.     finally:
  240.         cleanup()
  241.  
  242.  
  243. if __name__ == '__main__':
  244.     test_main()
  245.  
  246.